-
-
Notifications
You must be signed in to change notification settings - Fork 712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework BatchedSend logic #661
Conversation
Does away with the timeout and looking up a private attribute on IOStream. Refs PR dask#653.
except Exception: | ||
logger.exception("Error in batched write") | ||
break | ||
self.next_deadline = self.loop.time() + self.interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to base the next deadline on when we started the last send rather than when we finsihed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I don't know. What are the intended semantics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class generally tries to solve the problem of streams on which we want to send thousands of small messages per second, such as might occur in the following situation:
for x in range(10000):
future = client.submit(inc, x)
futures.append(future)
(or on the worker to scheduler side, as the worker reports status updates)
We found that these situations were significantly faster if we never sent two messages within a few milliseconds of each other, preferring instead to batch them. If it has been more than a few milliseconds since the last payload was dispatched and the last payload has finished then I think we should be able to send again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the yield write(...)
wasn't really useful in the previous version? There's no need to wait on the write future if we want to base the deadline on the start of the write operation.
Related question: what is with gen.with_timeout(timedelta(seconds=0.01),...)
in d.core.write
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is related to what was going on here and is also a possible source of error. Some explanation here: #653
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. We can remove it.
if self.next_deadline is not None: | ||
delay = self.next_deadline - self.loop.time() | ||
if delay > 0: | ||
yield gen.sleep(delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this added delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It mirrors the yield self.last_send
that was here previously. Perhaps I'm misunderstanding the intent :-)
@@ -37,7 +37,7 @@ def handle_stream(self, stream, address): | |||
self.count += 1 | |||
yield write(stream, msg) | |||
except StreamClosedError as e: | |||
pass | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be a syntax error in Python 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, only return
with an explicit value is forbidden. Bare return
allows exiting the coroutine.
This looks pretty nice to me |
By the way, |
Removing it sounds fine to me. Generally I am happy to yield to your judgment on anything related to this issue. I suspect that you have a lot more experience here than I do. |
Testing failures here are unrelated. Addressing them in #662 . |
break | ||
except gen.TimeoutError: | ||
pass | ||
yield stream.write(frames[-1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this could fail if we write to the same stream in another coroutine. Ideally we shouldn't do this. Normally the rpc
class creates new streams as necessary to handle concurrent communications to the same destination. All cases that I can find when a coroutine writes directly to a stream it creates and owns that stream exclusively.
Still though, we were running into problems in the wild that suggested that this might be an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how that's different from the old code, though? It would also wait on futures[-1]
and only catch timeout errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would raise a timeout error if the future didn't complete quickly and then fall back to checking if the stream's write_buffer was empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit surprised that this would make a difference. What were the symptoms of the problems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere some coroutine is stuck waiting on yield write(...)
. This first occurred on yield self.last_write
within BatchedSend, and resulted in messages waiting in the worker's message buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. That's because IOStream.write
can forget previous futures. So how about we don't wait for the write at all? We could simply yield gen.moment
so that write() remains a coroutine...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would want to yield on the write if we were going to apply backpressure. We're not doing this yet at other stages though so yes, I suspect that that would work fine. We're moving the data pile-up from the BatchedSend buffer to the Tornado write buffer, which is probably appropriate anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out we must wait for the write() to be issued before closing the stream. This is gonna be a bit hairy...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More or less hairy than polling on the _write_buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the solution is to flush the stream explicitly before closing. Let me try it out.
This is recommended before closing the stream. | ||
""" | ||
if stream.writing(): | ||
yield stream.write(b'') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know that this will complete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API expects that write()
isn't called before flush()
completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we confident that this expectation is fulfilled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the docstring to better inform the reader. But perhaps we only want to expose close() so that we don't do any further mistakes. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm mostly concerned about us as users. Only very advanced dask/distributed users should use read/write/close directly.
However, given that we've had problems reported it's possible that we aren't handling everything well internally.
I have no problem with read/write/close as an API generally.
""" | ||
if not stream.closed(): | ||
try: | ||
flush(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we yield on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right, my bad.
It would be good to develop some tests to stress communication in a few ways. However, I'm not entirely sure how this needs to be stressed. One thing that I've found to be useful in the past is to change |
Thoughts on ignoring the RuntimeError around the h5py test? |
This all seems fine to me. +1 |
I would hope h5py merges the pull request that would fix the issue. |
I do not expect h5py to merge or release quickly. |
Does away with the timeout and looking up a private attribute on IOStream.
Refs PR #653.